package com.deodar.kettle.platform.monitor.executor;

import com.deodar.kettle.platform.monitor.util.TaskManageUtil;
import com.deodar.common.utils.IDUtil;
import com.deodar.kettle.platform.common.App;
import com.deodar.kettle.platform.common.state.TaskStatusState;
import com.deodar.kettle.platform.common.util.KettleConstant;
import com.deodar.kettle.platform.common.util.KettleImagesUtil;
import com.deodar.kettle.platform.common.util.KettleSpringContext;
import com.deodar.kettle.platform.database.domain.RnExecutionLog;
import com.deodar.kettle.platform.database.service.IRnExecutionLogService;
import com.deodar.kettle.platform.database.service.impl.RnExecutionLogServiceImpl;
import com.deodar.kettle.platform.monitor.service.SlaveService;
import com.deodar.kettle.platform.monitor.util.KettleConfigUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.Result;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.www.SlaveServerJobStatus;

import java.util.Date;
import java.util.concurrent.Callable;

/**
 * @author: Administrator
 * @Date: 2020/04/14 20:22
 * @Description:
 * @version:1.0.0
 */
@Slf4j
public class JobExecutor implements Callable<String> {

    JobExecutionConfiguration executionConfiguration;

    JobMeta jobMeta;

    Job job = null;

    boolean isClickStop=false; //判断用户是否已经点过了停止

    String carteObjectId;

    String logId;//日志id

    RnExecutionLog executionLog;

    String key;

    long errCount = 0;

    public JobExecutor(JobExecutionConfiguration executionConfiguration, JobMeta jobMeta, String logId) {
        this.executionConfiguration = executionConfiguration;
        this.jobMeta = jobMeta;
        this.logId = logId;
    }

    @Override
    public String call() throws Exception {
        run();
        return null;
    }

    private  void run(){
        IRnExecutionLogService rnExecutionLogService = KettleSpringContext.getBean(RnExecutionLogServiceImpl.class);
        boolean insertFlag = false;
        if(logId == null || "".equals(logId)){
            logId = IDUtil.getSnowflakeId();
            log.info("job logId:"+logId);
            executionLog = new RnExecutionLog();
            executionLog.setId(logId);
            insertFlag = true;
        }else {
            executionLog = rnExecutionLogService.selectRnExecutionLogById(logId);
        }

        key = KettleConstant.KEY_PREFIX+logId;
        try {
            SlaveService slaveService = KettleSpringContext.getBean(SlaveService.class);
            SlaveServer slaveServer = slaveService.getMinLoad();

            executionConfiguration.setRemoteServer(slaveServer);
            int status = TaskStatusState.STATE_RUNNING.getStatus();

            executionLog.setStartTime(new Date());
            executionLog.setStatus(status);
            executionLog.setTaskName(jobMeta.getName());
            executionLog.setTaskType(2);
            executionLog.setExecutionMethod("远程："+slaveServer.getHostname());

            executionConfiguration.setRemoteServer(slaveServer);


            for (String varName : executionConfiguration.getVariables().keySet()) {
                String varValue = executionConfiguration.getVariables().get(varName);
                jobMeta.setVariable(varName, varValue);
            }

            for (String paramName : executionConfiguration.getParams().keySet()) {
                String paramValue = executionConfiguration.getParams().get(paramName);
                jobMeta.setParameterValue(paramName, paramValue);
            }

            executionLog.setStartTime(new Date());
            if(insertFlag){
                rnExecutionLogService.insertRnExecutionLog(executionLog);
            }else {
                rnExecutionLogService.updateRnExecutionLog(executionLog);
            }

            executionConfiguration.setRepository(App.getInstance().getRepository());
            carteObjectId = Job.sendToSlaveServer( jobMeta, executionConfiguration, App.getInstance().getRepository(),
                    App.getInstance().getMetaStore() );
            log.info("job carteObjectId:{}",carteObjectId);
            SlaveServer remoteSlaveServer = executionConfiguration.getRemoteServer();
            boolean running = true;
            while(running) {
                SlaveServerJobStatus jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), carteObjectId, 0);

                running = jobStatus.isRunning();
                System.out.println(jobStatus.getLoggingString());
                if(!running) {
                    Result result= jobStatus.getResult();
                    if(result !=null) {
                        errCount =result.getNrErrors();
                    }
                }
                Thread.sleep(30000);
            }

            if(errCount > 0){
                status= TaskStatusState.STATE_FAIL.getStatus();
            }else {
                status = TaskStatusState.STATE_SUCCESS.getStatus();
            }
            executionLog.setStatus(status);
            executionLog.setExecutionLog(getExecutionLog());
        }catch (Exception e){

            log.error(ExceptionUtils.getStackTrace(e));
            executionLog.setStatus(TaskStatusState.STATE_FAIL.getStatus());
            executionLog.setRemark(ExceptionUtils.getStackTrace(e));
        }finally {
            TaskManageUtil.getInstance().remove(key);
            if(executionLog.getEndTime() ==null) {
                executionLog.setEndTime(new Date());
            }
            String imgPath = KettleImagesUtil.getImagePath(executionConfiguration.getRemoteServer(), "job", executionLog.getTaskName(),
                    carteObjectId, KettleConfigUtil.getKettleImagePath()+"/job");
            executionLog.setImagePath(imgPath);
            rnExecutionLogService.updateRnExecutionLog(executionLog);
            log.info("[{}] 任务执行完成",executionLog.getTaskName());
        }

    }


    public void stop(){
        if(null!=job){
            job.stopAll();
        }
    }

    public void setClickStop(boolean clickStop) {
        this.isClickStop = clickStop;
    }

    public String getExecutionLog() throws Exception {

        SlaveServer remoteSlaveServer = executionConfiguration.getRemoteServer();
        SlaveServerJobStatus jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), carteObjectId, 0);
        return jobStatus.getLoggingString();

    }
}
